When I built my first model in PySpark, I was confused because there is a lack of available resources on PySpark. It was through implementation that I realized data is fed into the model in a different way than Python.
So, I am going to guide you in making your first dataset input ready for PySpark (any classification/regression model in PySpark).
As rightly said by Edwards Deming – “Without data you are just another person with an opinion.”
So, let’s first understand what data is. Data can be of any domain, our general requirements would be for it to be:
Structured Data
Doesn’t have missing values
Doesn’t have many heavy outliers
Outlier treatment depends on the business requirement to a major extent. That would be a whole other story to tell. For now, I am assuming that the data has been made structured, with proper defined schema, and with no missing values.
Tip: You can use df.fillna() function of PySpark to fill in missing values. (where df represents PySpark data frame)
Fill in the missing values with 0 or mean as required and cap outliers 1 and 99 percentile (depending on the business requirement).
Below are the leading steps followed by examples of snippets (and a sample data for clarity)
Treat categorical/ordinal variables by String Indexer and/or One Hot Encoder.
Assemble all categorical and numerical variables that you want as input into a single column named “features” which will be used as our final input.
(Optional) Apply normalization to the newly created variable to reduce variance in data.
The final column, features, will be a vector with values of all variables selected.
E.g., [1,4,6]
The final input for the model is a single column consisting of a vector form of all desired input columns.
Prior to combining the inputs into a vector, we need to treat categorical/ordinal variables. This can be done by two methods:
String Indexer - Allocates a numeric value to each category in a column. For e.g., Male = 1, Female = 2.
One-Hot Encoding - Transforms categories into binary vectors with at most one nonzero value eg, Male [0,1], Female [1,0]
The One Hot Encoder will convert the column to a sparse vector.
Then, the Pipeline Function is used to create a pipeline of all steps and run them in a single run.
A snippet:
Import required libraries
1
2
3
4
from pyspark.ml.pipeline
import Pipeline, PipelineModel
from pyspark.ml.feature
import StringIndexer, OneHotEncoder, StandardScaler, IndexToString, StringIndexerModel
Define the categorical and numerical features
1 2
CategoricalFeatures = ['gender'] NumericalFeatures = ['age']
Defining String Indexer for categorical features
1 2 3
indexers = [StringIndexer(inputCol = col, outputCol = "c_{}".format(col))\ for col in CategoricalFeatures]
One Hot encoding for categorical features after string index
1 2 3
encoders = [OneHotEncoder(inputCol = "c_{}".format(col), outputCol = "o_{}".format(col))\ for col in CategoricalFeatures]
Assemble final format of categorical and numerical features
1
2
assembler = VectorAssembler(inputCols = NumericalFeatures + \["o_{}".format(col) for col in CategoricalFeatures],
outputCol = "features")
Form a pipeline
1
2
3
preprocessor = Pipeline(stages = indexers + encoders\ +
[assembler])
.fit(df)
Apply the changes to form a new datasetdf = preprocessor.transform(df)
Here, you can simply edit the list of categorical and numerical variables in start and the data frame name in the last line. Rest code can be used to transform columns.
A similar kind of treatment (String Indexer) can be applied to target variables for a model.
The new data frame (df) can be fed into the model in Mllib library, by selecting setting the parameter (featuresCol = “features”).
This process makes our data frame model ready!
Sample:
Before Processing:
units | spend | categorical |
358 | 2.732765363 | j |
1872 | 2.01982906 | x |
131 | 1.226641221 | t |
3015 | 2.404736318 | x |
2507 | 1.67453929 | x |
593 | 3.542462057 | x |
904 | 1.641969027 | x |
1746 | 1.591970218 | x |
627 | 1.264338118 | x |
597 | 3.133735343 | x |
1572 | 2.055585242 | x |
953 | 2.641322141 | x |
8 | 1.525 | s |
1029 | 1.795364431 | x |
2305 | 1.746277657 | x |
683 | 2.71988287 | x |
1748 | 1.705337529 | x |
1553 | 2.025692209 | x |
186 | 3.733870968 | t |
1858 | 1.815662002 | x |
units | spend | categorical | c_categorical | o_categorical | features |
358 | 2.732765363 | j | 3 | (3,[],[]) | (5,[0,1],[358.0,2.732765363128492]) |
1872 | 2.01982906 | x | 0 | (3,[0],[1.0]) | [1872.0,2.01982905982906,1.0,0.0,0.0] |
131 | 1.226641221 | t | 1 | (3,[1],[1.0]) | [131.0,1.2266412213740456,0.0,1.0,0.0] |
3015 | 2.404736318 | x | 0 | (3,[0],[1.0]) | [3015.0,2.40473631840796,1.0,0.0,0.0] |
2507 | 1.67453929 | x | 0 | (3,[0],[1.0]) | [2507.0,1.6745392899880336,1.0,0.0,0.0] |
593 | 3.542462057 | x | 0 | (3,[0],[1.0]) | [593.0,3.5424620573355816,1.0,0.0,0.0] |
904 | 1.641969027 | x | 0 | (3,[0],[1.0]) | [904.0,1.6419690265486722,1.0,0.0,0.0] |
1746 | 1.591970218 | x | 0 | (3,[0],[1.0]) | [1746.0,1.5919702176403205,1.0,0.0,0.0] |
627 | 1.264338118 | x | 0 | (3,[0],[1.0]) | [627.0,1.2643381180223283,1.0,0.0,0.0] |
597 | 3.133735343 | x | 0 | (3,[0],[1.0]) | [597.0,3.133735343383585,1.0,0.0,0.0] |
1572 | 2.055585242 | x | 0 | (3,[0],[1.0]) | [1572.0,2.05558524173028,1.0,0.0,0.0] |
953 | 2.641322141 | x | 0 | (3,[0],[1.0]) | [953.0,2.641322140608605,1.0,0.0,0.0] |
8 | 1.525 | s | 2 | (3,[2],[1.0]) | [8.0,1.525,0.0,0.0,1.0] |
1029 | 1.795364431 | x | 0 | (3,[0],[1.0]) | [1029.0,1.7953644314868802,1.0,0.0,0.0] |
2305 | 1.746277657 | x | 0 | (3,[0],[1.0]) | [2305.0,1.7462776572668113,1.0,0.0,0.0] |
683 | 2.71988287 | x | 0 | (3,[0],[1.0]) | [683.0,2.719882869692533,1.0,0.0,0.0] |
1748 | 1.705337529 | x | 0 | (3,[0],[1.0]) | [1748.0,1.7053375286041188,1.0,0.0,0.0] |
1553 | 2.025692209 | x | 0 | (3,[0],[1.0]) | [1553.0,2.025692208628461,1.0,0.0,0.0] |
186 | 3.733870968 | t | 1 | (3,[1],[1.0]) | [186.0,3.733870967741936,0.0,1.0,0.0] |
1858 | 1.815662002 | x | 0 | (3,[0],[1.0]) | [1858.0,1.8156620021528525,1.0,0.0,0.0] |
Here, we can observe that the categorical column has 3 distinct values (x,t,j) which are converted to 0,1,2 respectively by String Indexer. (column c_categorical)
Further, One Hot encoder converts them to a 1*3 vector. (column o_categorical)
Vector Assembler creates a vector of all features to be fed into the model. (column features)
All the above steps are added to a pipeline and executed at once.
A Step further:
Example of fitting in model:
1
2
3
4
5
6
7
from pyspark.ml.classification import(LogisticRegression, LogisticRegressionModel)
train, test = df.randomSplit([0.7, 0.3], seed = 42)
lg = LogisticRegression(labelCol = "target", featuresCol = "features", maxIter = 100, regParam = 1, elasticNetParam = 1)
lgModel = lg.fit(train)
lgPreds = lgModel.transform(test)
print('Prediction on test data is done! \n')
This would clear up the modeling pre-processing in PySpark.